博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【SparkSQL】DataSet、DataFrame 介绍
阅读量:329 次
发布时间:2019-03-04

本文共 5968 字,大约阅读时间需要 19 分钟。

【SparkSQL】DataSet、DataFrame 介绍


目录:

    
    
    
    

    
    
    


一、DataSet介绍

1.DataSet是什么?

DataSet 是一个强类型,并且类型安全的数据容器,并且提供了结构化查询API和类似RDD一样的命令式API

2.DataSet查询方式

  • 环境配置
{
// 创建SparkSessionval spark = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate()// 导入隐式转换,这里的spark是上面创建的对象import spark.implicits._// 演示创建DataSetval sourceRDD = spark.sparkContext.parallelize(Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0)))val dataset = sourceRDD.toDS()}// 创建样例类case class person(name:String,age:Int,score:Double)
  • dataset支持RDD似的强类型API
// 1.dataset支持RDD似的强类型API    // 每一个item就是一个person类型的对象,直接通过对象获取    dataset.filter(item => item.age < 20 ).show()
  • dataset支持弱类型的API
// 2.dataset支持弱类型的API    // 直接通过结构中的字段名取获取    dataset.filter('age < 20 ).show()    dataset.filter($"age" < 20 ).show()
  • dataset 支持直接通过SQL表达式查询
// 3.dataset 支持直接通过SQL表达式查询    dataset.filter("age < 20").show()

在这里插入图片描述


3.DataSet底层是什么?

  • 执行优化
dataset.explain(true)

无论是否执行sql,dataset都会被优化器优化

在这里插入图片描述

  • DataSet底层形式

DataSet最底层处理的是对象的序列化形式。通过查看DataSet生成的物理执行计划,也就是最终处理的RDD,就可以判定DataSet底层处理的是什么形式的数据。

val sourceRDD = spark.sparkContext.parallelize(Seq(person("a", 18, 98.0), person("b", 20, 97.0),                 person("c", 18, 100.0)))val dataset = sourceRDD.toDS()val execution = dataset.queryExecution.toRdd
  • dataset.queryExecution.toRdd 这个API可以看到DataSet底层执行的RDD,这个RDD中的泛型是InternalRow
  • InternalRow 又被称为Catalyst Row,是DataSet底层的数据结构
  • 无论DataSet中放置的是什么类型对象,最终执行计划中的RDD上都是 InternalRow
    在这里插入图片描述
  • 如上图所示,数据集在读取后转为DataSet交由优化器优化,再转为InternalRow发送到集群上运行。在DataSet的泛型对象执行前,需要通过Encoder转换为InternalRow;而在读取转为DataSet的时候,是由Decoder来进行转化的


4.DataSet转为同类型的RDD(DataSet.rdd)

// 演示创建DataSet//val sourceRDD = spark.sparkContext.parallelize(Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0))) //val dataset: Dataset[person] = sourceRDD.toDS() val dataset: Dataset[person] = spark.createDataset(Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0)))
  • 直接获取到已经分析和解析过的 DataSet 的执行计划,从中拿到 RDD
// 直接获取到已经分析和解析过的 DataSet 的执行计划,从中拿到 RDD val execution: RDD[InternalRow] = dataset.queryExecution.toRdd
  • 将 DataSet 底层的 rdd[InternalRow] 通过 Decoder 转成了和 DataSet 一样类型的 RDD
// 将 DataSet 底层的 rdd[InternalRow] 通过 Decoder 转成了和 DataSet 一样类型的 RDD 。 val typeRDD: RDD[person] = dataset.rdd
// toDebugString 查看执行过程 println(execution.toDebugString) println("*****************************") println(typeRDD.toDebugString) dataset.show() typeRDD.collect().foreach(println(_))

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


二、DataFrame介绍

1.DataFrame是什么?

DataFrame是SparkSQL中一个表示关系型数据库中 表 的函数式抽象,其作用是让Spark处理大规模结构化数据的时候更加容易:

  • 一般DataFrame可以处理结构化数据,或者是半结构化数据,因为这两类数据中都可以获取到Schema信息,也就是说DataFrame中有Schema信息
  • 也可以像操作表一样操作DataFrame
  • DataFrame与RDD结构对比:
    在这里插入图片描述
  • DataFrame组成部分:row集合、结构信息schema
  • 一个是row的集合,每个row对象表示一个行
  • 二是描述DataFrame结构的Schema

在这里插入图片描述


2.DataFrame的创建方式

// 演示创建DataFrame    val personList: Seq[person] = Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0))
  • toDF()---- 直接作用于集合型数据集等或通过读取的RDD转化成DataFrame,注意一定要导入隐式转换,否则无法调用toDF()
// 导入隐式转换,这里的spark是上面创建的对象    import spark.implicits._    //  1.toDF()    val dataFrame  = personList.toDF()    val dataFrame1 = spark.sparkContext.parallelize(personList).toDF()

import spark.implicits._

导入隐式转换的作用:

  • 通过隐式转换的内置函数将对应的RDD数据类型,调用toDF()后转为DataFrame类型。
    在这里插入图片描述
  • createDataFrame()
//  2.createDataFrame()    val dataFrame2 = spark.createDataFrame(personList)

createDataFrame()方法支持多种类型的数据转换:这里是引用

  • DataFrameReader ---- 通过sparksql的读取框架,获取DataFrame形式的数据
//  3.DataFrameReader    val dataFrame3 =  spark.read.csv("dataset/BeijingPM20100101_20151231.csv")


3.DataFrame支持的数据操作 ---- 案例

案例:查看北京PM值每月的统计数量

// 配置环境 val spark = SparkSession.builder()   .master("local[6]")   .appName("test")   .getOrCreate() // 导入隐式转换、sql函数 import spark.implicits._ import org.apache.spark.sql.functions._ // 读取数据 val df = spark.read   .option("header",value = true)   .csv("dataset/BeijingPM20100101_20151231.csv") df.show() // 提取数据 // 统计每年每月聚合  // 方式一 --- 命令式API df.select('year,'month,'PM_Dongsi)     // 查询字段   .where('PM_Dongsi =!= "NA")          // 空值处理   .groupBy('year,'month)               // 分组   .agg(count("month") as "count")      // 聚合   .sort('count desc)                   // 排序   .show() // 方式二  --- sql语句    df.createOrReplaceTempView("pm")  // 创建临时表    spark.sql("select year,month,count(PM_Dongsi) as count from pm where PM_Dongsi!='NA' group by year,month order by count desc")      .show()  spark.close()

在这里插入图片描述

  • 查看数据结构信息:
    在这里插入图片描述

总结:

在这里插入图片描述



三、DataFrame与DataSet的区别

1.DataFrame表达的含义是一个支持函数式操作的表,而DataSet表达的是一个类似RDD的东西,DataSet可以处理任何对象

2.DataFrame中存放的是Row对象,而DataSet中可以存放任何类型的对象

type DataFrame = Dataset[Row]

// 演示创建DataFrame val personList = Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0)) // 1.DataFrame是弱类型的,DataSet是强类型的 // type DataFrame = Dataset[Row]val DataFrame: DataFrame = personList.toDF()val DataSet: Dataset[person] = personList.toDS()

3.DataFrame的操作方式和DataSet是一样的,但是对于强类型操作而言,他们处理的类型不一样

在这里插入图片描述
在这里插入图片描述

  1. DataFrame 所代表的弱类型编译是不安全的;DataSet 所代表的操作是类型安全的,编译时安全的
    在这里插入图片描述
  • DataFrame 在进行map等操作的时候,不能使用像person这样的scala对象,所以无法做到编译时检查
  • DataSet 表示具体的某一类对象,如person,所以在进行map等操作的时候,传入的是某个具体的scala对象,如果调用错了方法,编译时就会报错

总结:

在这里插入图片描述



四、Row对象

  • 1.Row 是什么,如何创建?

Row 对象表示的是一个 行

Row 的操作类似于 Scala 中的 Map 数据类型
Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。

// 样例类case class People(name:String,age:Integer)// Row对象必须配合Schema对象才会有列名val p: People = People("Jack",20)val row: Row = Row("Tom",18)
  • 2.如何从 Row 中获取数据?
// 类似于数组val name: String = row.getString(0)val age: Int = row.getInt(1)println(name)println(age)val name1: String = p.nameval age1: Int = p.ageprintln(name1)println(age1)

在这里插入图片描述

  • 3.Row 也是样例类
row match {
case Row(name,age) => println(name,age)}

在这里插入图片描述

  1. DataFrame 和 Dataset 之间可以非常简单的相互转换:只需要定义一个样例类,用DataFrame.as[样例类对象]
case class Person(name:String,age:Int)val spark: SparkSession = new sql.SparkSession.Builder()  .appName("hello")  .master("local[6]")  .getOrCreate()import spark.implicits._val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()val ds_fdf: Dataset[People] = df.as[People]val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()val df_fds: DataFrame = ds.toDF()


转载地址:http://lweq.baihongyu.com/

你可能感兴趣的文章